package com.example.consumer.base;

import com.alibaba.fastjson.JSON;
import com.example.bean.User;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;


@Component
public class Consumer {
    //默认交换机
    @RabbitListener(queues = "helloQueue")
    public void hello(@Payload String user) {
        User user1 = JSON.parseObject(user, User.class);

        System.out.println("helloQueue消费者1--------------名称=" + user1.getName() + "-------年龄=" + user1.getAge());
    }

    //直连交换
    @RabbitListeners(@RabbitListener(bindings = @QueueBinding(value = @Queue("myDirectQueue1"), exchange = @Exchange(value = "myDirectExchange", type = ExchangeTypes.DIRECT), key = "mind.direct")))
    public void directConsumer(@Payload String user) {
        User user1 = JSON.parseObject(user, User.class);
        System.out.println("directConsumer消费者1--------------名称=" + user1.getName() + "-------年龄=" + user1.getAge());
    }

    //扇形交换机
    @RabbitListeners({@RabbitListener(bindings = @QueueBinding(value = @Queue("myFanoutQueue1"), exchange = @Exchange(value = "myFanoutExchange", type = ExchangeTypes.FANOUT), key = "")),
            @RabbitListener(bindings = @QueueBinding(value = @Queue("myFanoutQueue2"), exchange = @Exchange(value = "myFanoutExchange", type = ExchangeTypes.FANOUT), key = ""))})
    @RabbitHandler
    public void fanoutConsumer(@Payload String user, @Headers Map<String, Object> headers) {

        User user1 = JSON.parseObject(user, User.class);
        System.out.println("fanoutConsumer消费者1--------------名称=" + headers.get(AmqpHeaders.CONSUMER_QUEUE) + user1.getName() + "-------年龄=" + user1.getAge());
    }

    //
    @RabbitListeners({@RabbitListener(bindings = @QueueBinding(value = @Queue("myTopicQueue1"), exchange = @Exchange(value = "myTopicExchange", type = ExchangeTypes.TOPIC), key = "*.topic.com"))
            , @RabbitListener(bindings = @QueueBinding(value = @Queue("myTopicQueue2"), exchange = @Exchange(value = "myTopicExchange", type = ExchangeTypes.TOPIC), key = "#"))})
    @RabbitHandler
    public void topicConsumer(@Payload String user, @Headers Map<String, Object> headers) {
        User user1 = JSON.parseObject(user, User.class);
        System.out.println("topicConsumer消费者1--------------名称=" + headers.get(AmqpHeaders.CONSUMER_QUEUE) + user1.getName() + "-------年龄=" + user1.getAge());
    }

    //测试死信

//            设置成manual手动确认，一定要对消息做出应答，否则rabbit认为当前队列没有消费完成，将不再继续向该队列发送消息。
//            1.channel.basicAck(long,boolean); 确认收到消息，消息将被队列移除，false只确认当前consumer一个消息收到，true确认所有consumer获得的消息。
//            2.channel.basicNack(long,boolean,boolean); 确认否定消息，第一个boolean表示一个consumer还是所有，第二个boolean表示requeue是否重新回到队列，true重新入队。
//            3.channel.basicReject(long,boolean); 拒绝消息，requeue=false 表示不再重新入队，如果配置了死信队列则进入死信队列。
//            4.当消息回滚到消息队列时，这条消息不会回到队列尾部，而是仍是在队列头部，这时消费者会又接收到这条消息，如果想消息进入队尾，须确认消息后再次发送消息。
    // @Argument(name = "x-message-ttl", value = "5000", type = "java.lang.Integer") 存活时间
    @RabbitListeners({
            @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "myLivequeue", arguments = {
                    @Argument(name = "x-dead-letter-exchange", value = "myDeadExchange"), @Argument(name = "x-dead-letter-routing-key", value = "deadKey"),
                    @Argument(name = "x-max-length", value = "3", type = "java.lang.Integer"
                    )
            }), exchange = @Exchange(value = "myLiveExchange", type = ExchangeTypes.DIRECT), key = {"error", "info", "warning"})),
            @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "myLivequeue2"), exchange = @Exchange(value = "myLiveExchange", type = ExchangeTypes.DIRECT), key = "error")),
    })
    //消息被Consumer拒收，并且reject方法里的参数requeue为false，则消息不会被重新放回队列，不会被其他消费者消费。
    //2、消息以及队列的过期时间到了
    //3、队列的长度满了，排在前面的消息会被丢弃或者扔到死信路由上面。
   @RabbitHandler
    public void liveConsumer(Message message, Channel channel) throws IOException {

        User user1 =    JSON.parseObject( new String(message.getBody()),User.class);


        try {
//            System.out.println("liveConsumer消费者1--------------名称=" + message.getMessageProperties().getHeaders().get(AmqpHeaders.CONSUMER_QUEUE) + user1.getName() + "-------年龄=" + user1.getAge());
//            System.out.println("消息确认,消费成功！");

            // 确认收到消息，false只确认当前consumer一个消息收到，true确认所有consumer获得的消息
//        String ok= redis.get("msg"+messageId);//从redis中获取是否消费过的id
//        if (ok=="sucess"){  //消费过了的消息直接丢弃然后返回。
//            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false, false);
//            return;
//        }
            //用一个消息消费表来记录每一条消息，给每个一个消息设置一个id(uuid)，消费了就保存到表中去。消息过来的时候先查询是否已经消费。消息的幂等性。
            send();
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            System.out.println("消息确认");
            //   redis.set("msg"+message.getMessageProperties().getMessageId(),"success")
        } catch (RuntimeException e) {
             //   判断是否以前收过消息但是未确认
            if (message.getMessageProperties().getRedelivered()) {
                System.out.println("消息已重复处理失败两次,拒绝再次接收，并将扔入死信");
                //存储持续报错信息到日志 或者数据库
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
            } else {
                System.out.println("消息一场，即将再次返回队列处理");
                // requeue为是否重新回到队列，true重新入队 为true则会再次消费一次
                // 当消息重新投递到消息队列时，这条消息不会回到队列尾部，仍是在队列头部。
                //消费者会立刻消费这条消息，业务处理再抛出异常，消息再重新入队，如此反复进行。导致消息队列处理出现阻塞，
                // 导致正常消息也无法运行。而我们当时的解决方案是，先将消息进行应答，此时消息队列会删除该条消息，同时我们再次发送该消息到消息队列，异常消息就放在了消息队列尾部，这样既保证消息不会丢失，又保证了正常业务的进行。
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                //优质解决 可以设置重试次数，超过重试次数放入死信队列
            }
        }
    }
    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "myDeadQueue"),exchange = @Exchange(value = "myDeadExchange",type = ExchangeTypes.DIRECT),key = "deadKey"))
    @RabbitHandler
    public void xdlConsumer(Message message, Channel channel) throws IOException {
        User user1 =    JSON.parseObject( new String(message.getBody()),User.class);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        System.out.println("liveConsumer消费者1--------------名称=" + message.getMessageProperties().getHeaders().get(AmqpHeaders.CONSUMER_QUEUE) + user1.getName() + "-------年龄=" + user1.getAge());
    }
  private static  void   send (){
      System.out.println("业务执行，xiao消息报错");
      throw new RuntimeException("业务报错！！！！！！！！！");
  }

}
